feat: support manifest&list writer part 1#169
feat: support manifest&list writer part 1#169dongxiao1198 wants to merge 4 commits intoapache:mainfrom
Conversation
1 add avro writer and factory(without write func since converter pull/166 not finished yet) 2 add manifest and manifest list writer internal implement definition
wgtmac
left a comment
There was a problem hiding this comment.
I'd suggest split this PR into separate ones to make review easier. Perhaps one for avro writer, one for manifest list writer, and one for manifest file writer.
| class ICEBERG_EXPORT Metrics { | ||
| public: | ||
| Metrics() = default; | ||
|
|
||
| explicit Metrics(int64_t row_count, | ||
| std::unordered_map<int64_t, int64_t> column_sizes = {}, | ||
| std::unordered_map<int64_t, int64_t> value_counts = {}, | ||
| std::unordered_map<int64_t, int64_t> null_value_counts = {}, | ||
| std::unordered_map<int64_t, int64_t> nan_value_counts = {}, | ||
| std::unordered_map<int64_t, Literal> lower_bounds = {}, | ||
| std::unordered_map<int64_t, Literal> upper_bounds = {}) | ||
| : row_count_(row_count), | ||
| column_sizes_(std::move(column_sizes)), | ||
| value_counts_(std::move(value_counts)), | ||
| null_value_counts_(std::move(null_value_counts)), | ||
| nan_value_counts_(std::move(nan_value_counts)), | ||
| lower_bounds_(std::move(lower_bounds)), | ||
| upper_bounds_(std::move(upper_bounds)) {} | ||
|
|
||
| private: | ||
| int64_t row_count_ = 0; | ||
| std::unordered_map<int64_t, int64_t> column_sizes_; | ||
| std::unordered_map<int64_t, int64_t> value_counts_; | ||
| std::unordered_map<int64_t, int64_t> null_value_counts_; | ||
| std::unordered_map<int64_t, int64_t> nan_value_counts_; | ||
| std::unordered_map<int64_t, Literal> lower_bounds_; | ||
| std::unordered_map<int64_t, Literal> upper_bounds_; | ||
| }; |
There was a problem hiding this comment.
| class ICEBERG_EXPORT Metrics { | |
| public: | |
| Metrics() = default; | |
| explicit Metrics(int64_t row_count, | |
| std::unordered_map<int64_t, int64_t> column_sizes = {}, | |
| std::unordered_map<int64_t, int64_t> value_counts = {}, | |
| std::unordered_map<int64_t, int64_t> null_value_counts = {}, | |
| std::unordered_map<int64_t, int64_t> nan_value_counts = {}, | |
| std::unordered_map<int64_t, Literal> lower_bounds = {}, | |
| std::unordered_map<int64_t, Literal> upper_bounds = {}) | |
| : row_count_(row_count), | |
| column_sizes_(std::move(column_sizes)), | |
| value_counts_(std::move(value_counts)), | |
| null_value_counts_(std::move(null_value_counts)), | |
| nan_value_counts_(std::move(nan_value_counts)), | |
| lower_bounds_(std::move(lower_bounds)), | |
| upper_bounds_(std::move(upper_bounds)) {} | |
| private: | |
| int64_t row_count_ = 0; | |
| std::unordered_map<int64_t, int64_t> column_sizes_; | |
| std::unordered_map<int64_t, int64_t> value_counts_; | |
| std::unordered_map<int64_t, int64_t> null_value_counts_; | |
| std::unordered_map<int64_t, int64_t> nan_value_counts_; | |
| std::unordered_map<int64_t, Literal> lower_bounds_; | |
| std::unordered_map<int64_t, Literal> upper_bounds_; | |
| }; | |
| struct ICEBERG_EXPORT Metrics { | |
| int64_t row_count = 0; | |
| std::unordered_map<int64_t, int64_t> column_sizes; | |
| std::unordered_map<int64_t, int64_t> value_counts; | |
| std::unordered_map<int64_t, int64_t> null_value_counts; | |
| std::unordered_map<int64_t, int64_t> nan_value_counts; | |
| std::unordered_map<int64_t, Literal> lower_bounds; | |
| std::unordered_map<int64_t, Literal> upper_bounds; | |
| }; |
What about making it a simple struct to enable aggregate initialization?
|
|
||
| #pragma once | ||
|
|
||
| /// \file iceberg/metrics.h |
There was a problem hiding this comment.
Add some comment here? e.g. Iceberg file format metrics
| const std::vector<ManifestEntry>& entries) const = 0; | ||
|
|
||
| /// \brief Close writer and flush to storage. | ||
| virtual void Close() = 0; |
There was a problem hiding this comment.
| virtual void Close() = 0; | |
| virtual Status Close() = 0; |
| virtual Status WriteManifestFiles(const std::vector<ManifestFile>& files) const = 0; | ||
|
|
||
| /// \brief Close writer and flush to storage. | ||
| virtual void Close() = 0; |
There was a problem hiding this comment.
| virtual void Close() = 0; | |
| virtual Status Close() = 0; |
| /// When available, this information is used for planning scan tasks whose boundaries | ||
| /// are determined by these offsets. The returned list must be sorted in ascending order | ||
| /// Only valid after the file is closed. | ||
| virtual std::vector<int64_t> splitOffsets() = 0; |
There was a problem hiding this comment.
| virtual std::vector<int64_t> splitOffsets() = 0; | |
| virtual std::vector<int64_t> split_offsets() = 0; |
| /// \brief Get the file statistics. | ||
| virtual std::shared_ptr<Metrics> metrics() = 0; | ||
|
|
||
| /// \brief Get the file length. |
There was a problem hiding this comment.
| /// \brief Get the file length. | |
| /// \brief Get the file length. | |
| /// Only valid after the file is closed. |
| /// \return Status of write results. | ||
| virtual Status Write(ArrowArray data) = 0; | ||
|
|
||
| /// \brief Get the file statistics. |
There was a problem hiding this comment.
| /// \brief Get the file statistics. | |
| /// \brief Get the file statistics. | |
| /// Only valid after the file is closed. |
| virtual Status Write(ArrowArray data) = 0; | ||
|
|
||
| /// \brief Get the file statistics. | ||
| virtual std::shared_ptr<Metrics> metrics() = 0; |
There was a problem hiding this comment.
| virtual std::shared_ptr<Metrics> metrics() = 0; | |
| virtual Metrics metrics() = 0; |
Perhaps we can just return a simple struct instead of a shared_ptr?
| } // namespace | ||
|
|
||
| // A stateful context to keep track of the writing progress. | ||
| struct WriteContext {}; |
There was a problem hiding this comment.
Do we really need a context? Writer is much simpler than the reader impl.
| auto root = std::make_shared<::avro::NodeRecord>(); | ||
| ToAvroNodeVisitor visitor; | ||
| for (const auto& field : write_schema_->fields()) { | ||
| ::avro::NodePtr node; | ||
| ICEBERG_RETURN_UNEXPECTED(visitor.Visit(field, &node)); | ||
| root->addLeaf(node); | ||
| } |
There was a problem hiding this comment.
| auto root = std::make_shared<::avro::NodeRecord>(); | |
| ToAvroNodeVisitor visitor; | |
| for (const auto& field : write_schema_->fields()) { | |
| ::avro::NodePtr node; | |
| ICEBERG_RETURN_UNEXPECTED(visitor.Visit(field, &node)); | |
| root->addLeaf(node); | |
| } | |
| ::avro::NodePtr root; | |
| ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); |
ok, I will seperate them into 2 pr |
1 add avro writer and factory(without write func since converter pull/166 not finished yet)
2 add manifest and manifest list writer internal implement definition